package drds.plus.executor.record_codec;

import drds.plus.executor.record_codec.record.FixedLengthRecord;
import drds.plus.executor.record_codec.record.Record;
import drds.plus.sql_process.abstract_syntax_tree.configuration.ColumnMetaData;
import drds.plus.sql_process.type.DecodeResult;
import drds.plus.sql_process.type.Type;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FixedLengthRecordCodec implements RecordCodec<byte[]> {

    List<ColumnMetaData> columnMetaDataList;
    Map<String, Integer> columnNameToIndexMap;

    public FixedLengthRecordCodec(List<ColumnMetaData> columnMetaDataList) {
        this.columnMetaDataList = columnMetaDataList;
        this.columnNameToIndexMap = new HashMap(columnMetaDataList.size());
        for (ColumnMetaData key : columnMetaDataList) {
            if (!columnNameToIndexMap.containsKey(key.getColumnName())) {
                columnNameToIndexMap.put(key.getColumnName(), columnNameToIndexMap.size());// like dataSourceIndex++
            }
        }
    }

    public byte[] encode(Record record) {
        int length = calculateLength(record, columnMetaDataList);
        byte[] bytes = new byte[length];
        int offset = 0;
        for (ColumnMetaData columnMetaData : columnMetaDataList) {
            Object object = record.get(columnMetaData.getColumnName());
            Type type = columnMetaData.getType();
            if (object == null && !columnMetaData.isNullable()) {
                throw new RuntimeException(columnMetaData + " is not nullable.");
            }
            offset += type.encode(object, bytes, offset);//返回长度
        }
        return bytes;
    }

    private int calculateLength(Record record, List<ColumnMetaData> columnMetaDataList) {
        int length = 0;
        for (ColumnMetaData columnMetaData : columnMetaDataList) {
            Object object = record.get(columnMetaData.getColumnName());
            Type type = columnMetaData.getType();
            length += type.getLength(object);//返回长度
        }
        return length;
    }

    public Record decode(byte[] bytes) {
        if (bytes == null) {
            return null;
        }
        FixedLengthRecord fixedLengthRecord = new FixedLengthRecord(columnNameToIndexMap, columnMetaDataList);
        int offset = 0;
        for (int i = 0; i < columnMetaDataList.size(); i++) {
            ColumnMetaData columnMetaData = columnMetaDataList.get(i);
            Type type = columnMetaData.getType();
            //
            DecodeResult decodeResult = type.decode(bytes, offset);
            Object object = decodeResult.value;//返回VALUE
            offset += decodeResult.length;//返回长度
            //
            fixedLengthRecord.setValue(columnNameToIndexMap.get(columnMetaData.getColumnName()), object);
        }
        return fixedLengthRecord;
    }

    public Record newRecord() {
        return new FixedLengthRecord(columnNameToIndexMap, columnMetaDataList);
    }

}
